use std::collections::HashMap;
use std::time::{Duration, SystemTime};

use crate::{Error, NatsAuth, Result, TlsConfig, tls};
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
use async_nats::jetstream::{AckKind, Message as JetstreamMessage, consumer};
use async_nats::{
    ConnectOptions,
    jetstream::consumer::{Consumer, pull::Config},
};
use backoff::retry::Retry;
use backoff::strategy::fixed;
use bytes::Bytes;
use chrono::DateTime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{self, Instant, sleep};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{error, warn};

#[derive(Debug, Clone, PartialEq)]
pub struct ConsumerDeliverPolicy(DeliverPolicy);

impl ConsumerDeliverPolicy {
    pub const ALL: ConsumerDeliverPolicy = ConsumerDeliverPolicy(DeliverPolicy::All);
    pub const LAST: ConsumerDeliverPolicy = ConsumerDeliverPolicy(DeliverPolicy::Last);
    pub const NEW: ConsumerDeliverPolicy = ConsumerDeliverPolicy(DeliverPolicy::New);
    pub const LAST_PER_SUBJECT: ConsumerDeliverPolicy =
        ConsumerDeliverPolicy(DeliverPolicy::LastPerSubject);

    pub fn by_start_sequence(start_sequence: u64) -> Self {
        Self(DeliverPolicy::ByStartSequence { start_sequence })
    }

    pub fn by_start_time(timestamp: SystemTime) -> Self {
        Self(DeliverPolicy::ByStartTime {
            start_time: timestamp.into(),
        })
    }
}

impl TryFrom<&str> for ConsumerDeliverPolicy {
    type Error = Error;

    fn try_from(value: &str) -> Result<Self> {
        let fields = value
            .split_ascii_whitespace()
            .map(|v| v.trim().to_lowercase())
            .collect::<Vec<String>>();
        let fields = fields.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();

        let policy = match fields[..] {
            ["all"] => Self(DeliverPolicy::All),
            ["last"] => Self(DeliverPolicy::Last),
            ["last_per_subject"] => Self(DeliverPolicy::LastPerSubject),
            ["new"] => Self(DeliverPolicy::New),
            ["by_start_sequence", sequence] => {
                let sequence_id = sequence.parse::<u64>().map_err(|err| {
                    Error::Other(format!(
                        "start_sequence '{sequence}' is not a valid unsigned integer: {err:?}"
                    ))
                })?;
                Self(DeliverPolicy::ByStartSequence {
                    start_sequence: sequence_id,
                })
            }
            ["by_start_time", start_time] => {
                let epoch_ms = start_time.parse::<i64>().map_err(|err| {
                    Error::Other(format!(
                        "epoch time should be in milliseconds specified as a valid integer: {err:?}"
                    ))
                })?;
                let duration = Duration::from_millis(epoch_ms.unsigned_abs());
                let timestamp = match epoch_ms.is_positive() {
                    true => SystemTime::UNIX_EPOCH.checked_add(duration),
                    false => SystemTime::UNIX_EPOCH.checked_sub(duration),
                }.ok_or(
                    Error::Other(format!(
                        "Specified epoch_time '{start_time}' is out of bounds to be represented as system time"
                    ))
                )?;

                Self(DeliverPolicy::ByStartTime {
                    start_time: timestamp.into(),
                })
            }
            _ => {
                return Err(Error::Other(format!(
                    "Invalid option for DeliverPolicy: {value}"
                )));
            }
        };
        Ok(policy)
    }
}

#[derive(Debug, Clone, PartialEq)]
pub struct JetstreamSourceConfig {
    pub addr: String,
    pub stream: String,
    pub consumer: String,
    pub deliver_policy: ConsumerDeliverPolicy,
    pub filter_subjects: Vec<String>,
    pub auth: Option<NatsAuth>,
    pub tls: Option<TlsConfig>,
}

/// Message represents a message received from Jetstream which can be converted to Numaflow Message.
#[derive(Debug)]
pub struct Message {
    /// The user payload.
    pub value: Bytes,
    /// Monotonically increasing sequence number as generated by JetStream.
    pub stream_sequence: u64,
    pub published_timestamp: DateTime<chrono::Utc>,
    pub headers: HashMap<String, String>,
}

impl TryFrom<JetstreamMessage> for Message {
    type Error = Error;
    fn try_from(msg: JetstreamMessage) -> Result<Self> {
        let headers = match msg.message.headers.as_ref() {
            Some(headers) => headers
                .iter()
                .map(|(k, v)| {
                    (
                        k.to_string(),
                        v.first().map(|v| v.to_string()).unwrap_or_default(),
                    )
                }) // NOTE: we are only using the first value of the header
                .collect(),
            None => HashMap::new(),
        };

        let stream_sequence = msg
            .info()
            .map_err(|e| {
                Error::Jetstream(format!("fetching message metadata from Jetstream: {e:?}"))
            })?
            .stream_sequence;

        let published_timestamp = msg
            .info()
            .map_err(|e| {
                Error::Jetstream(format!("fetching message metadata from Jetstream: {e:?}"))
            })?
            .published;

        Ok(Message {
            value: msg.message.payload,
            stream_sequence,
            headers,
            published_timestamp: DateTime::from_timestamp(
                published_timestamp.unix_timestamp(),
                published_timestamp.nanosecond(),
            )
            .expect("Failed to convert timestamp to DateTime"),
        })
    }
}

enum JetstreamActorMessage {
    Read {
        respond_to: oneshot::Sender<Result<Vec<Message>>>,
    },
    Ack {
        offsets: Vec<u64>,
        respond_to: oneshot::Sender<Result<()>>,
    },
    Nack {
        offsets: Vec<u64>,
        respond_to: oneshot::Sender<Result<()>>,
    },
    Pending {
        respond_to: oneshot::Sender<Result<Option<usize>>>,
    },
}

struct JetstreamActor {
    consumer: Consumer<Config>,
    read_timeout: Duration,
    batch_size: usize,
    in_progress_messages: HashMap<u64, MessageProcessingTracker>,
    handler_rx: mpsc::Receiver<JetstreamActorMessage>,
    cancel_token: CancellationToken,
}

impl JetstreamActor {
    async fn start(
        config: JetstreamSourceConfig,
        batch_size: usize,
        read_timeout: Duration,
        handler_rx: mpsc::Receiver<JetstreamActorMessage>,
        cancel_token: CancellationToken,
    ) -> Result<()> {
        let mut conn_opts = ConnectOptions::new()
            .max_reconnects(None) // unlimited reconnects
            .reconnect_delay_callback(|attempts| {
                std::time::Duration::from_millis(std::cmp::min((attempts * 10) as u64, 1000))
            })
            .ping_interval(Duration::from_secs(3))
            .retry_on_initial_connect();
        if let Some(auth) = config.auth {
            conn_opts = match auth {
                NatsAuth::Basic { username, password } => {
                    conn_opts.user_and_password(username, password)
                }
                NatsAuth::NKey(nkey) => conn_opts.nkey(nkey),
                NatsAuth::Token(token) => conn_opts.token(token),
            };
        }
        if let Some(tls_config) = config.tls {
            conn_opts = tls::configure_tls(conn_opts, tls_config)?;
        }
        let client = async_nats::connect_with_options(&config.addr, conn_opts)
            .await
            .map_err(|err| Error::Connection {
                server: config.addr.to_string(),
                error: err.to_string(),
            })?;

        let js_ctx = async_nats::jetstream::new(client);
        let consumer = js_ctx
            .get_consumer_from_stream(&config.consumer, &config.stream)
            .await;

        let consumer = match consumer {
            Ok(consumer) => consumer,
            Err(e) => {
                let async_nats::jetstream::stream::ConsumerErrorKind::JetStream(e) = e.kind()
                else {
                    return Err(Error::Jetstream(format!(
                        "Getting consumer {} from Jetstream stream {}: {e:?}",
                        config.consumer, config.stream
                    )));
                };

                if e.error_code() != async_nats::jetstream::ErrorCode::CONSUMER_NOT_FOUND {
                    return Err(Error::Jetstream(format!(
                        "Getting consumer {} from Jetstream stream {}: {e:?}",
                        config.consumer, config.stream
                    )));
                }

                tracing::info!(
                    "Consumer {} not found on stream {}. Creating new consumer.",
                    config.consumer,
                    config.stream
                );
                js_ctx
                    .create_consumer_on_stream(
                        consumer::pull::Config {
                            durable_name: Some(config.consumer.clone()),
                            description: Some("Numaflow Jetstream Consumer".into()),
                            deliver_policy: config.deliver_policy.0,
                            ack_policy: AckPolicy::Explicit,
                            filter_subjects: config.filter_subjects,
                            ..Default::default()
                        },
                        &config.stream,
                    )
                    .await
                    .map_err(|e| {
                        Error::Jetstream(format!(
                            "Creating consumer {} on stream {}: {e:?}",
                            config.consumer, config.stream
                        ))
                    })?
            }
        };

        tokio::spawn(async move {
            let mut actor = JetstreamActor {
                consumer,
                read_timeout,
                batch_size,
                in_progress_messages: HashMap::new(),
                handler_rx,
                cancel_token,
            };
            tracing::info!("Starting Jetstream...");
            actor.run().await;
        });

        Ok(())
    }

    async fn run(&mut self) {
        while let Some(msg) = self.handler_rx.recv().await {
            self.handle_message(msg).await;
        }
    }

    async fn handle_message(&mut self, msg: JetstreamActorMessage) {
        match msg {
            JetstreamActorMessage::Read { respond_to } => {
                let messages = self.read_messages().await;
                let _ = respond_to.send(messages);
            }
            JetstreamActorMessage::Ack {
                offsets,
                respond_to,
            } => {
                let status = self.ack_messages(offsets).await;
                let _ = respond_to.send(status);
            }
            JetstreamActorMessage::Nack {
                offsets,
                respond_to,
            } => {
                let status = self.nack_messages(offsets).await;
                let _ = respond_to.send(status);
            }
            JetstreamActorMessage::Pending { respond_to } => {
                let pending = self.pending_messages().await;
                let _ = respond_to.send(pending);
            }
        }
    }

    /// Reads messages in batch from the Jetstream with honoring timeouts.
    async fn read_messages(&mut self) -> Result<Vec<Message>> {
        let mut messages: Vec<Message> = Vec::with_capacity(self.batch_size);

        let mut batch = match self
            .consumer
            .batch()
            .max_messages(self.batch_size)
            .expires(self.read_timeout)
            .messages()
            .await
        {
            Ok(batch) => batch,
            Err(e) => {
                warn!(
                    ?e,
                    "Failed to get message batch from Jetstream (ignoring, will be retried)"
                );
                return Ok(Vec::new());
            }
        };

        while let Some(message) = batch.next().await {
            let message = match message {
                Ok(msg) => msg,
                Err(e) => {
                    warn!(?e, "Failed to fetch a message from batch, retrying...");
                    sleep(Duration::from_millis(RETRY_INTERVAL)).await;
                    continue;
                }
            };

            messages.push(self.process_message(message).await?);
        }

        tracing::debug!(msg_count = messages.len(), "Read batch from Jetstream");
        Ok(messages)
    }

    async fn ack_messages(&mut self, offsets: Vec<u64>) -> Result<()> {
        let mut tasks = Vec::with_capacity(offsets.len());

        for offset in offsets {
            let msg_task = self.in_progress_messages.remove(&offset);
            let Some(msg_task) = msg_task else {
                warn!(offset, "Received ACK request for unknown offset");
                continue;
            };

            // msg_task.ack() involves sending on a oneshot channel to an already running tokio task
            // This results in sending Ack to Nats server (within the tokio task) and awaiting for
            // the task to finish. We spawn tasks here so that acks happens concurrently.
            let task = tokio::spawn(async move {
                msg_task.ack().await;
            });
            tasks.push(task);
        }

        for task in tasks {
            if let Err(err) = task.await {
                return Err(Error::Other(format!("Error in ack task: {err:?}")));
            }
        }
        Ok(())
    }

    async fn nack_messages(&mut self, offsets: Vec<u64>) -> Result<()> {
        let mut tasks = Vec::with_capacity(offsets.len());

        for offset in offsets {
            let msg_task = self.in_progress_messages.remove(&offset);
            let Some(msg_task) = msg_task else {
                warn!(offset, "Received NACK request for unknown offset");
                continue;
            };

            // msg_task.nack() involves sending on a oneshot channel to an already running tokio task
            // This results in sending Nack to Nats server (within the tokio task) and awaiting for
            // the task to finish. We spawn tasks here so that nacks happens concurrently.
            let task = tokio::spawn(async move {
                msg_task.nack().await;
            });
            tasks.push(task);
        }

        for task in tasks {
            if let Err(err) = task.await {
                return Err(Error::Other(format!("Error in nack task: {err:?}")));
            }
        }
        Ok(())
    }

    async fn process_message(&mut self, js_message: JetstreamMessage) -> Result<Message> {
        // we need to clone because the message should be held around for ack
        let message: Message = js_message.clone().try_into().map_err(|e| {
            Error::Jetstream(format!(
                "converting raw Jetstream message as Numaflow source message: {e:?}"
            ))
        })?;

        // we need to start WIP ack because some processing can be quite slow and we have to avoid
        // redelivery.
        let tick_interval = self.consumer.cached_info().config.ack_wait / 2;
        let message_tracker =
            MessageProcessingTracker::start(js_message, tick_interval, self.cancel_token.clone())
                .await;
        self.in_progress_messages
            .insert(message.stream_sequence, message_tracker);

        Ok(message)
    }

    pub async fn pending_messages(&mut self) -> Result<Option<usize>> {
        let x = self
            .consumer
            .info()
            .await
            .map_err(|e| Error::Jetstream(format!("Failed to get consumer info: {e:?}")))?;

        Ok(Some(x.num_pending as usize + x.num_ack_pending))
    }
}

#[derive(Clone)]
pub struct JetstreamSource {
    actor_tx: mpsc::Sender<JetstreamActorMessage>,
}

impl JetstreamSource {
    pub async fn connect(
        config: JetstreamSourceConfig,
        batch_size: usize,
        read_timeout: Duration,
        cancel_token: CancellationToken,
    ) -> Result<Self> {
        let (tx, rx) = mpsc::channel(10);
        JetstreamActor::start(config, batch_size, read_timeout, rx, cancel_token).await?;
        Ok(Self { actor_tx: tx })
    }

    pub async fn read_messages(&self) -> Result<Vec<Message>> {
        let (tx, rx) = oneshot::channel();
        let msg = JetstreamActorMessage::Read { respond_to: tx };
        let _ = self.actor_tx.send(msg).await;
        rx.await
            .map_err(|_| Error::Other("Actor task terminated".into()))?
    }

    pub async fn ack_messages(&self, offsets: Vec<u64>) -> Result<()> {
        let (tx, rx) = oneshot::channel();
        let msg = JetstreamActorMessage::Ack {
            offsets,
            respond_to: tx,
        };
        let _ = self.actor_tx.send(msg).await;
        rx.await
            .map_err(|_| Error::Other("Actor task terminated".into()))?
    }

    pub async fn nack_messages(&self, offsets: Vec<u64>) -> Result<()> {
        let (tx, rx) = oneshot::channel();
        let msg = JetstreamActorMessage::Nack {
            offsets,
            respond_to: tx,
        };
        let _ = self.actor_tx.send(msg).await;
        rx.await
            .map_err(|_| Error::Other("Actor task terminated".into()))?
    }

    pub async fn pending_messages(&self) -> Result<Option<usize>> {
        let (tx, rx) = oneshot::channel();
        let msg = JetstreamActorMessage::Pending { respond_to: tx };
        let _ = self.actor_tx.send(msg).await;
        rx.await
            .map_err(|_| Error::Other("Actor task terminated".into()))?
    }
}

/// The MessageProcessingTracker object is a handle to the work-in-progress background task for a
/// message. The background task periodically sends `AckKind::Progress` to the Nats server while the
/// message is being processed. This handle will be tracked as value in a hashmap in the Jetstream
/// source handle where the key will be the corresponding message sequence id (offset).
/// When the `Sourcer` trait's ack is called, the `MessageProcessingTracker::ack` gets called, which
/// results in marking the message processing completion in Nats server and termination of the
/// work-in-progress task.
struct MessageProcessingTracker {
    in_progress_task: JoinHandle<()>,
    ack_signal_tx: oneshot::Sender<AckKind>,
}

// same as rust/numaflow-core/src/pipeline/isb/jestream/reader.rs
const RETRY_INTERVAL: u64 = 100;
const RETRY_ATTEMPTS: usize = usize::MAX;

impl MessageProcessingTracker {
    async fn start(msg: JetstreamMessage, tick: Duration, cancel_token: CancellationToken) -> Self {
        let (ack_signal_tx, ack_signal_rx) = oneshot::channel();
        let task = tokio::spawn(Self::start_work_in_progress(
            msg,
            tick,
            ack_signal_rx,
            cancel_token,
        ));
        Self {
            in_progress_task: task,
            ack_signal_tx,
        }
    }

    /// Starts a background task to send an ACK to Jetstream for the message. It will do the final
    /// ACK/NACK after the task is completed.
    async fn start_work_in_progress(
        msg: JetstreamMessage,
        tick: Duration,
        ack_signal_rx: oneshot::Receiver<AckKind>,
        cancel_token: CancellationToken,
    ) {
        let start = Instant::now();
        let mut interval = time::interval_at(start + tick, tick);

        let ack_in_progress = async || {
            if cancel_token.is_cancelled() {
                return;
            }
            let ack_result = msg.ack_with(AckKind::Progress).await;
            if let Err(e) = ack_result {
                error!(?e, "Failed to send InProgress Ack to Jetstream for message");
            }
        };

        tokio::pin!(ack_signal_rx);

        loop {
            let ack = tokio::select! {
                biased;

                ack = &mut ack_signal_rx => ack,
                _ = interval.tick() => {
                    ack_in_progress().await;
                    continue;
                },
            };

            match ack {
                Ok(ack_kind) => {
                    Self::invoke_ack_with_retry(&msg, ack_kind, &cancel_token).await;
                }
                Err(e) => {
                    error!(?e, "Received error while waiting for Ack oneshot channel");
                    Self::invoke_ack_with_retry(&msg, AckKind::Nak(None), &cancel_token).await;
                }
            }
            break;
        }
    }

    // invokes the ack with infinite retries until the cancellation token is cancelled.
    async fn invoke_ack_with_retry(
        msg: &JetstreamMessage,
        ack_kind: AckKind,
        cancel_token: &CancellationToken,
    ) {
        let interval = fixed::Interval::from_millis(RETRY_INTERVAL).take(RETRY_ATTEMPTS);
        let _ = Retry::new(
            interval,
            async || {
                let result = match ack_kind {
                    AckKind::Ack => msg.double_ack().await, // double ack is used for exactly once semantics
                    _ => msg.ack_with(ack_kind).await,
                };

                result.map_err(|e| {
                    Error::Jetstream(format!("Failed to send {ack_kind:?} to Jetstream: {e:?}"))
                })
            },
            |e: &Error| {
                if cancel_token.is_cancelled() {
                    error!(
                        ?e,
                        "Cancellation token received, stopping the {ack_kind:?} retry loop"
                    );
                    return false;
                }

                warn!(
                    ?e,
                    "Failed to send {ack_kind:?} Ack to Jetstream for message, retrying..."
                );
                true
            },
        )
        .await;
    }

    async fn ack(self) {
        let Self {
            in_progress_task,
            ack_signal_tx,
        } = self;
        if let Err(err) = ack_signal_tx.send(AckKind::Ack) {
            error!(
                ?err,
                "Background task to mark the message status as in-progress is already terminated"
            );
            return;
        }
        let _ = in_progress_task.await;
    }

    async fn nack(self) {
        let Self {
            in_progress_task,
            ack_signal_tx,
        } = self;
        if let Err(err) = ack_signal_tx.send(AckKind::Nak(None)) {
            error!(
                ?err,
                "Background task to mark the message status as in-progress is already terminated"
            );
            return;
        }
        let _ = in_progress_task.await;
    }
}

#[cfg(test)]
mod tests {

    use super::*;

    use async_nats::jetstream::Context;
    use async_nats::jetstream::stream::Config as StreamConfig;
    use std::time::Duration;

    async fn setup_jetstream(stream_name: &str, create_consumer: bool) -> Context {
        let client = async_nats::connect("localhost").await.unwrap();
        let js = async_nats::jetstream::new(client);

        let _ = js.delete_stream(stream_name).await;
        let stream = js
            .get_or_create_stream(StreamConfig {
                name: stream_name.to_string(),
                ..Default::default()
            })
            .await
            .unwrap();

        if create_consumer {
            stream
                .get_or_create_consumer(
                    stream_name,
                    Config {
                        durable_name: Some(stream_name.to_string()),
                        ..Default::default()
                    },
                )
                .await
                .unwrap();
        }

        js
    }

    async fn source_functionality_test(
        source: JetstreamSource,
        js: Context,
        stream_name: &'static str,
    ) {
        for i in 0..100 {
            js.publish(stream_name, format!("message {}", i).into())
                .await
                .unwrap();
        }
        let read_timeout = Duration::from_secs(1);
        // Read messages
        let messages = source.read_messages().await.unwrap();
        assert_eq!(messages.len(), 30);
        let pending = source.pending_messages().await.unwrap();
        assert_eq!(
            pending,
            Some(100),
            "Pending messages should include unacknowledged messages"
        );

        // Ack messages
        let offsets: Vec<u64> = messages.iter().map(|msg| msg.stream_sequence).collect();
        source.ack_messages(offsets).await.unwrap();

        // Check pending messages
        // If checked immediately, Nats server intermittently returns 1 more than the actual value
        tokio::time::sleep(Duration::from_millis(30)).await;
        let pending = source.pending_messages().await.unwrap();
        assert_eq!(
            pending,
            Some(70),
            "Pending messages should be 70 after acking 30 messages"
        );

        // Read remaining messages
        let messages = source.read_messages().await.unwrap();
        assert_eq!(messages.len(), 30);

        // Ack remaining messages
        let offsets: Vec<u64> = messages.iter().map(|msg| msg.stream_sequence).collect();
        source.ack_messages(offsets).await.unwrap();

        // Check pending messages
        // If checked immediately, Nats server intermittently returns 1 more than the actual value
        tokio::time::sleep(Duration::from_millis(30)).await;
        let pending = source.pending_messages().await.unwrap();
        assert_eq!(
            pending,
            Some(40),
            "Pending messages should be 40 after acking another 30 messages"
        );

        // Read remaining messages
        let messages = source.read_messages().await.unwrap();
        assert_eq!(messages.len(), 30);

        // Ack remaining messages
        let offsets: Vec<u64> = messages.iter().map(|msg| msg.stream_sequence).collect();
        source.ack_messages(offsets).await.unwrap();

        // Check pending messages
        tokio::time::sleep(Duration::from_millis(30)).await;
        let pending = source.pending_messages().await.unwrap();
        assert_eq!(
            pending,
            Some(10),
            "Pending messages should be 10 after acking another 30 messages"
        );

        // Read remaining messages
        let messages = source.read_messages().await.unwrap();
        assert_eq!(messages.len(), 10);

        // Ack remaining messages
        let offsets: Vec<u64> = messages.iter().map(|msg| msg.stream_sequence).collect();
        source.ack_messages(offsets).await.unwrap();

        // Check pending messages
        tokio::time::sleep(Duration::from_millis(30)).await;
        let pending = source.pending_messages().await.unwrap();
        assert_eq!(
            pending,
            Some(0),
            "Pending messages should be 0 after acking all messages"
        );

        // Ensure read operation returns after the read timeout
        let start = Instant::now();
        let messages = source.read_messages().await.unwrap();
        let elapsed = start.elapsed();
        assert!(
            elapsed < read_timeout + Duration::from_millis(100),
            "Read operation should return in 1 second"
        );
        assert!(
            messages.is_empty(),
            "No messages should be returned after all messages are acked"
        );
    }

    #[cfg(feature = "nats-tests")]
    #[tokio::test]
    async fn test_jetstream_source_consumer_exists_on_stream() {
        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

        let stream_name = "test_jetstream_source_consumer_exists_on_stream";
        let js = setup_jetstream(stream_name, true).await;
        let config = JetstreamSourceConfig {
            addr: "localhost".to_string(),
            stream: stream_name.into(),
            consumer: stream_name.into(),
            deliver_policy: ConsumerDeliverPolicy::ALL,
            filter_subjects: vec![],
            auth: None,
            tls: None,
        };
        let cancel_token = CancellationToken::new();
        let source = JetstreamSource::connect(config, 30, Duration::from_secs(1), cancel_token)
            .await
            .unwrap();

        source_functionality_test(source, js, stream_name).await;
    }

    #[cfg(feature = "nats-tests")]
    #[tokio::test]
    async fn test_jetstream_source_consumer_not_exists_on_stream() {
        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

        let stream_name = "test_jetstream_source_consumer_not_exists_on_stream";
        let js = setup_jetstream(stream_name, false).await;
        let config = JetstreamSourceConfig {
            addr: "localhost".to_string(),
            stream: stream_name.into(),
            consumer: stream_name.into(),
            deliver_policy: ConsumerDeliverPolicy::ALL,
            filter_subjects: vec![],
            auth: None,
            tls: None,
        };
        let cancel_token = CancellationToken::new();
        let source = JetstreamSource::connect(config, 30, Duration::from_secs(1), cancel_token)
            .await
            .unwrap();

        source_functionality_test(source, js, stream_name).await;
    }

    #[test]
    fn test_try_from_str_for_consumer_deliver_policy() {
        let policy: ConsumerDeliverPolicy = "all".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::ALL);

        let policy: ConsumerDeliverPolicy = "AlL".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::ALL);

        let policy: ConsumerDeliverPolicy = "last".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::LAST);

        let policy: ConsumerDeliverPolicy = "lAsT".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::LAST);

        let policy: ConsumerDeliverPolicy = "new".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::NEW);

        let policy: ConsumerDeliverPolicy = "NEW".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::NEW);

        let policy: ConsumerDeliverPolicy = "last_per_subject".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::LAST_PER_SUBJECT);

        let policy: ConsumerDeliverPolicy = "last_PER_suBJEct".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::LAST_PER_SUBJECT);

        let policy: ConsumerDeliverPolicy = "by_start_sequence 768".try_into().unwrap();
        assert_eq!(policy, ConsumerDeliverPolicy::by_start_sequence(768));

        let policy: Result<ConsumerDeliverPolicy> = "by_start_sequence 7invalid8".try_into();
        let policy_parse_err = policy.unwrap_err();

        assert!(
            matches!(policy_parse_err, Error::Other(v) if v == "start_sequence '7invalid8' is not a valid unsigned integer: ParseIntError { kind: InvalidDigit }")
        );

        let policy: ConsumerDeliverPolicy = "by_start_time 1753428483000".try_into().unwrap();
        assert_eq!(
            policy,
            ConsumerDeliverPolicy::by_start_time(
                SystemTime::UNIX_EPOCH + Duration::from_millis(1753428483000)
            )
        );

        let policy: Result<ConsumerDeliverPolicy> = "by_start_time 175invalid3428483000".try_into();
        let policy_parse_err = policy.unwrap_err();

        assert!(
            matches!(policy_parse_err, Error::Other(v) if v == "epoch time should be in milliseconds specified as a valid integer: ParseIntError { kind: InvalidDigit }")
        );
    }
}
